加工数据

本文为您介绍如何使用DataWorks中的EMR Hive节点,对同步至OSS的用户信息表(ods_user_info_d_emr)及访问日志数据表(ods_raw_log_d_emr)中的数据进行加工,进而得到目标用户画像数据。

前提条件

开始本实验前,请首先完成同步数据中的操作。

步骤一:设计业务流程

业务流程节点间依赖关系的配置请参见同步数据

双击新建的业务流程打开编辑页面,鼠标单击EMR Hive并拖拽至右侧的编辑页面。在新建节点对话框中,输入节点名称,单击提交

此处需要新建3EMR Hive节点,依次命名为dwd_log_info_di_emrdws_user_info_all_di_emrads_user_info_1d_emr,并配置如下图所示的依赖关系。节点dwd_log_info_di_emr用于对原始OSS日志数据进行清洗,节点dws_user_info_all_di_emr用于将清洗后的日志数据和用户基本信息数据进行汇总,节点ads_user_info_1d_emr用于生成最终用户画像数据。

image

步骤二:创建函数

根据同步的原始日志数据格式,我们需要通过函数等方式将其拆解为目标格式。本案例已为您提供用于将IP解析为地域的函数代码包。您可以将代码包下载至本地,并在DataWorks注册为函数后,即可调用该函数。

上传资源

  1. 下载ip2region-emr.jar

  2. 数据开发页面打开WorkShop业务流程,右键单击EMR,选择新建资源 > EMR JAR,配置新建资源参数。image

    关键参数配置如下:

    • 存储路径:选择准备环境中创建的Bucket

    • 上传文件:选择已下载ip2region-emr.jar文件。

    其他参数保持默认或根据实际情况配置。

  3. 单击工具栏image.png,将资源提交至开发环境对应的EMR引擎项目。

注册函数

  1. 数据开发页面打开业务流程,右键单击EMR,选择新建函数

  2. 新建函数对话框中,函数名称输入getregion,单击新建,配置函数信息。image

    关键参数配置如下:

    • 所属资源:选择ip2region-emr.jar。

    • 类名:输入org.alidata.emr.udf.Ip2Region。

    其他参数保持默认或根据实际情况配置。

  3. 单击工具栏image.png,将函数提交至开发环境对应的EMR引擎项目。

步骤三:配置EMR Hive节点

新建dwd_log_info_di_emr节点

1. 编辑代码

双击dwd_log_info_di_emr节点,进入节点配置页面。在节点编辑页面,编写如下语句。

说明

如果您工作空间的数据开发中绑定多个EMR引擎,请按需选择EMR引擎。如果仅绑定一个EMR引擎,则无需选择。

--创建ODS层表
CREATE TABLE IF NOT EXISTS dwd_log_info_di_emr (
  ip STRING COMMENT 'ip地址',
  uid STRING COMMENT '用户ID',
  `time` STRING COMMENT '时间yyyymmddhh:mi:ss',
  status STRING COMMENT '服务器返回状态码',
  bytes STRING COMMENT '返回给客户端的字节数',
  region STRING COMMENT '地域,根据ip得到',
  method STRING COMMENT 'http请求类型',
  url STRING COMMENT 'url',
  protocol STRING COMMENT 'http协议版本号',
  referer STRING COMMENT '来源url',
  device STRING COMMENT '终端类型 ',
  identity STRING COMMENT '访问类型 crawler feed user unknown'
)
PARTITIONED BY (
  dt STRING
);

ALTER TABLE dwd_log_info_di_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}');

set hive.vectorized.execution.enabled = false;
INSERT OVERWRITE TABLE dwd_log_info_di_emr PARTITION (dt='${bizdate}')
SELECT ip
  , uid
  , tm
  , status
  , bytes 
  , getregion(ip) AS region --使用自定义UDF通过ip得到地域。 
  , regexp_extract(request, '(^[^ ]+) .*') AS method --通过正则把request差分为三个字段。
  , regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') AS url
  , regexp_extract(request, '.* ([^ ]+$)') AS protocol 
  , regexp_extract(referer, '^[^/]+://([^/]+){1}') AS referer  --通过正则清洗refer,得到更精准的url。
  , CASE
    WHEN lower(agent) RLIKE 'android' THEN 'android' --通过agent得到终端信息和访问形式。
    WHEN lower(agent) RLIKE 'iphone' THEN 'iphone'
    WHEN lower(agent) RLIKE 'ipad' THEN 'ipad'
    WHEN lower(agent) RLIKE 'macintosh' THEN 'macintosh'
    WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone'
    WHEN lower(agent) RLIKE 'windows' THEN 'windows_pc'
    ELSE 'unknown'
  END AS device
  , CASE
    WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler'
    WHEN lower(agent) RLIKE 'feed'
    OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') RLIKE 'feed' THEN 'feed'
    WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)'
    AND agent RLIKE '^[Mozilla|Opera]'
    AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') NOT RLIKE 'feed' THEN 'user'
    ELSE 'unknown'
  END AS identity
  FROM (
    SELECT SPLIT(col, '##@@')[0] AS ip
    , SPLIT(col, '##@@')[1] AS uid
    , SPLIT(col, '##@@')[2] AS tm
    , SPLIT(col, '##@@')[3] AS request
    , SPLIT(col, '##@@')[4] AS status
    , SPLIT(col, '##@@')[5] AS bytes
    , SPLIT(col, '##@@')[6] AS referer
    , SPLIT(col, '##@@')[7] AS agent
    FROM ods_raw_log_d_emr
  WHERE dt = '${bizdate}'
) a;

2. 配置调度属性

通过以下配置实现调度场景下,每日00:30待上游ods_raw_log_d_emr节点将存储于OSSuser_log.txt数据同步至EMRods_raw_log_d_emr表后,可触发当前dwd_log_info_di_emr节点对ods_raw_log_d_emr表数据进行加工,加工结果写入dwd_log_info_di_emr表对应业务时间分区。

配置项

配置内容

图示

新增参数

调度参数项中单击新增参数,添加:

  • 参数名:bizdate

  • 参数值:$[yyyymmdd-1]

image

调度依赖

调度依赖确认产出表已作为本节点输出。

格式为worksspacename.节点名

image

说明

时间属性的配置,配置调度周期为日,无需单独配置当前节点定时调度时间,当前节点每日调起时间由业务流程虚拟节点workshop_start_emr的定时调度时间控制,即每日00:30后才会调度。

3. 保存配置

本案例其他必填配置项,您可按需自行配置,配置完成后,在节点代码编辑页面,单击工具栏中的image.png按钮,保存当前配置。

新建dws_user_info_all_di_emr节点

1. 编辑代码

双击dws_user_info_all_di_emr节点,进入节点配置页面。在节点编辑页面,编写如下语句。

说明

如果您工作空间的数据开发中绑定多个EMR引擎,请按需选择EMR引擎。如果仅绑定一个EMR引擎,则无需选择。

--创建DW层表
CREATE TABLE IF NOT EXISTS dws_user_info_all_di_emr (
  uid STRING COMMENT '用户ID',
  gender STRING COMMENT '性别',
  age_range STRING COMMENT '年龄段',
  zodiac STRING COMMENT '星座',
  region STRING COMMENT '地域,根据ip得到',
  device STRING COMMENT '终端类型 ',
  identity STRING COMMENT '访问类型 crawler feed user unknown',
  method STRING COMMENT 'http请求类型',
  url STRING COMMENT 'url',
  referer STRING COMMENT '来源url',
  `time` STRING COMMENT '时间yyyymmddhh:mi:ss'
)
PARTITIONED BY (
  dt STRING
);

ALTER TABLE dws_user_info_all_di_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}');

INSERT OVERWRITE TABLE dws_user_info_all_di_emr PARTITION (dt='${bizdate}')
SELECT COALESCE(a.uid, b.uid) AS uid
  , b.gender
  , b.age_range
  , b.zodiac
  , a.region
  , a.device
  , a.identity
  , a.method
  , a.url
  , a.referer
  , a.`time`
FROM (
  SELECT *
  FROM dwd_log_info_di_emr
  WHERE dt = '${bizdate}'
) a
LEFT OUTER JOIN (
  SELECT *
  FROM ods_user_info_d_emr
  WHERE dt = '${bizdate}'
) b
ON a.uid = b.uid;

2. 配置调度属性

通过以下配置实现调度场景下。每日00:30待上游任务ods_user_info_d_emrdwd_log_info_di_emr执行完成后,可触发dws_user_info_all_di_emr节点对ods_user_info_d_emrdwd_log_info_di_emr两表进行合并处理,并写入dws_user_info_all_di_emr表中。

配置项

配置内容

图示

新增参数

调度参数项中单击新增参数,添加:

  • 参数名:bizdate

  • 参数值:$[yyyymmdd-1]

image

调度依赖

调度依赖确认产出表已作为本节点输出。

格式为worksspacename.节点名

image

说明

时间属性的配置,配置调度周期为日,无需单独配置当前节点定时调度时间,当前节点每日调起时间由业务流程虚拟节点workshop_start_emr的定时调度时间控制,即每日00:30后才会调度。

3. 保存配置

本案例其他必填配置项,您可按需自行配置,配置完成后,在节点代码编辑页面,单击工具栏中的image.png按钮,保存当前配置。

新建ads_user_info_1d_emr节点

1. 编辑代码

双击ads_user_info_1d_emr节点,进入节点配置页面。在节点编辑页面,编写如下语句。

说明

如果您工作空间的数据开发中绑定多个EMR引擎,请按需选择EMR引擎。如果仅绑定一个EMR引擎,则无需选择。

--创建RPT层表
CREATE TABLE IF NOT EXISTS ads_user_info_1d_emr (
  uid STRING COMMENT '用户ID',
  region STRING COMMENT '地域,根据ip得到',
  device STRING COMMENT '终端类型 ',
  pv BIGINT COMMENT 'pv',
  gender STRING COMMENT '性别',
  age_range STRING COMMENT '年龄段',
  zodiac STRING COMMENT '星座'
)
PARTITIONED BY (
  dt STRING
);

ALTER TABLE ads_user_info_1d_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}');

INSERT OVERWRITE TABLE ads_user_info_1d_emr PARTITION (dt='${bizdate}')
SELECT uid
  , MAX(region)
  , MAX(device)
  , COUNT(0) AS pv
  , MAX(gender)
  , MAX(age_range)
  , MAX(zodiac)
FROM dws_user_info_all_di_emr
WHERE dt = '${bizdate}'
GROUP BY uid;

调度配置

在上游dws_user_info_all_di_emr节点任务将ods_user_info_d_emr表和dwd_log_info_di_emr表合并后完成后,可触发ads_user_info_1d_emr节点任务,进一步加工数据生成可消费数据。

配置项

配置内容

图示

新增参数

调度参数项中单击新增参数,添加:

  • 参数名:bizdate

  • 参数值:$[yyyymmdd-1]

image

调度依赖

调度依赖确认产出表已作为本节点输出。

格式为worksspacename.节点名

image

说明

时间属性的配置,配置调度周期为日,无需单独配置当前节点定时调度时间,当前节点每日调起时间由业务流程虚拟节点workshop_start_emr的定时调度时间控制,即每日00:30后才会调度。

3. 保存配置

本案例其他必填配置项,您可按需自行配置,配置完成后,在节点代码编辑页面,单击工具栏中的image.png按钮,保存当前配置。

步骤四:提交业务流程

完成业务流程所有配置后,测试该流程是否能正常运行,测试成功后,需要提交流程等待发布。

  1. 在业务流程的编辑页面,单击运行,运行业务流程。

  2. 待业务流程中的所有节点后出现成功,单击提交,提交运行成功的业务流程。

  3. 选择提交对话框中需要提交的节点,勾选忽略输入输出不一致的告警

  4. 单击提交

  5. 提交成功后,即可在发布页面发布流程节点。

步骤五:在生产环境运行任务

任务发布后,在次日才会生成实例运行,您可以通过补数据来对已发布流程进行补数据操作,以便查看任务在生产环境是否可以运行,详情可参见执行补数据并查看补数据实例(新版)

  1. 任务发布成功后,单击右上角的运维中心

    您也可以进入业务流程的编辑页面,单击工具栏中的前往运维,进入运维中心页面。

  2. 单击左侧导航栏中的周期任务运维 > 周期任务,进入周期任务页面,单击workshop_start_emr虚节点。

  3. 在右侧的DAG图中,右键单击workshop_start_emr节点,选择补数据 > 当前节点及下游节点

  4. 勾选需要补数据的任务,输入业务日期,单击确定,自动跳转至补数据实例页面。

  5. 单击刷新,直至SQL任务全部运行成功即可。

后续步骤

任务周期性调度场景下,为保障任务产出的表数据符合预期,我们可以对任务产出的表数据进行数据质量监控,详情请参见配置数据质量监控